package org.study;

import org.apache.commons.lang.time.StopWatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

/**
 * 学习hbase对表进行CURD的API
 */
public class TableDemo {


    Connection connection = null;
    Admin admin = null;

    @Before
    public void connect() throws IOException {
        // 获取HBase的配置
        Configuration conf = HBaseConfiguration.create();
        // 连接hbase不是直接连接它的主节点，而是要连接zookeeper，通过zookeeper获取hbase的主节点
        // 配置zookeeper的连接地址
        conf.set("hbase.zookeeper.quorum", HbaseConfig.get("hbase.zookeeper.quorum"));
        conf.set("hbase.zookeeper.property.clientPort", HbaseConfig.get("hbase.zookeeper.property.clientPort"));
        // 发起连接
        connection = ConnectionFactory.createConnection(conf);
        // 获取管理权
        admin = connection.getAdmin();
    }

    @After
    public void close() throws IOException {
        if (null != admin) {
            admin.close();
        }
        if (null != connection) {
            connection.close();
        }
    }

    /**
<<<<<<< HEAD
     * 创建表
     * @throws IOException
=======
     * 生成测试数据，添加百万条数据
     */
    @Test
    public void generatorData() throws IOException {
        if (!admin.tableExists(TableName.valueOf("people"))) {
            ColumnFamilyDescriptor columnFamily1 = ColumnFamilyDescriptorBuilder.newBuilder("basic".getBytes(StandardCharsets.UTF_8)).build();
            TableDescriptor table = TableDescriptorBuilder.newBuilder(TableName.valueOf("default", "people"))
                    .setColumnFamily(columnFamily1).build();
            admin.createTable(table);
        }

        // 构建Append对象
        Table table = connection.getTable(TableName.valueOf("person"));
        List<Put> puts = new ArrayList<>();

        long startTime = System.currentTimeMillis();
        for (int row = 0; row < 1000000; row++) {
            byte[] rowkey = ("row" + row).getBytes(StandardCharsets.UTF_8);
            Put put = new Put(rowkey);
            // 指定列族
            byte[] basic = "basic".getBytes(StandardCharsets.UTF_8);
            // 指定列和数据
            put.addColumn(basic, "password1".getBytes(StandardCharsets.UTF_8), getPassword());
            put.addColumn(basic, "password2".getBytes(StandardCharsets.UTF_8), getPassword());
            puts.add(put);

            // 每1000条向HBase中添加一次
            if(puts.size() > 1000) {
                table.put(puts);
                puts.clear();
            }
        }
        long endTime = System.currentTimeMillis();
        // cost:15465
        System.out.println("cost:" + (endTime - startTime));
    }

    /**
     * @return 产生刘伟大写字母的随机密码
     */
    private byte[] getPassword(){
        StringBuilder sb = new StringBuilder(6);
        for (int i = 0; i < 6; i++) {
            char c = (char) (Math.random() * 26 + 65);
            sb.append(c);
        }
        return sb.toString().getBytes(StandardCharsets.UTF_8);
    }

    /**
     * 创建表
>>>>>>> d7e566b810f528936d49195e0192b7bd67629159
     */
    @Test
    public void createTable() throws IOException {
        // 列族
        ColumnFamilyDescriptor columnFamily1 = ColumnFamilyDescriptorBuilder.newBuilder("basic".getBytes(StandardCharsets.UTF_8)).build();
        ColumnFamilyDescriptor columnFamily2 = ColumnFamilyDescriptorBuilder.newBuilder("info".getBytes(StandardCharsets.UTF_8)).build();
        // 创建表，添加到default命名空间下，并且定义两个列族
        // 如果命名空间不存在，则会报错
        TableDescriptor table = TableDescriptorBuilder.newBuilder(TableName.valueOf("default", "users"))
                .setColumnFamily(columnFamily1).setColumnFamily(columnFamily2).build();
        admin.createTable(table);
    }

    /**
<<<<<<< HEAD
     * 查看所有namespace下的表
     * @throws IOException
=======
     * 删除表前要先disabled
>>>>>>> d7e566b810f528936d49195e0192b7bd67629159
     */
    @Test
    public void deleteTable() throws IOException {
        TableName tableName = TableName.valueOf("users");
        admin.disableTable(tableName);
        admin.deleteTable(tableName);
    }

    /**
     * 描述一个表的结构
     */
    @Test
    public void desc() throws IOException {
        TableDescriptor descriptor = admin.getDescriptor(TableName.valueOf("helloworld", "hello"));
        System.out.println(descriptor);
    }

    /**
     * 描述所有表的结构
     */
    @Test
    public void descAllTables() throws IOException {
        List<TableDescriptor> tableDescriptors = admin.listTableDescriptors();
        for (int i = 0; i < tableDescriptors.size(); i++) {
            TableDescriptor tableDescriptor = tableDescriptors.get(i);
            System.out.println(tableDescriptor);
        }
    }

    /**
     * 查看所有namespace下的表
     * @throws IOException
     */
    @Test
    public void exist() throws IOException {
        boolean isExists = admin.tableExists(TableName.valueOf("person"));
        System.out.println(isExists);
    }

    @Test
    public void putData() throws IOException {
        // 构建Append对象
        byte[] rowkey = "2".getBytes(StandardCharsets.UTF_8);
        Put put = new Put(rowkey);
        // 指定列族
        byte[] basic = "basic".getBytes(StandardCharsets.UTF_8);
        byte[] info = "info".getBytes(StandardCharsets.UTF_8);
        // 指定列和数据
        put.addColumn(basic, "name".getBytes(StandardCharsets.UTF_8), "张三".getBytes(StandardCharsets.UTF_8));
        put.addColumn(basic, "age".getBytes(StandardCharsets.UTF_8), "10".getBytes(StandardCharsets.UTF_8));
        put.addColumn(info, "address".getBytes(StandardCharsets.UTF_8), "guangzhou".getBytes(StandardCharsets.UTF_8));
        Table table = connection.getTable(TableName.valueOf("person"));
        table.put(put);
    }

    /**
     * 追加数据到表中
     */
    @Test
    public void appendData() throws IOException {
        // 构建Append对象
        byte[] rowkey = "2".getBytes(StandardCharsets.UTF_8);
        Append append = new Append(rowkey);
        // 指定列族
        byte[] basic = "basic".getBytes(StandardCharsets.UTF_8);
        byte[] info = "info".getBytes(StandardCharsets.UTF_8);
        // 指定列和数据
        append.addColumn(basic, "name".getBytes(StandardCharsets.UTF_8), "张三".getBytes(StandardCharsets.UTF_8));
        append.addColumn(basic, "age".getBytes(StandardCharsets.UTF_8), "10".getBytes(StandardCharsets.UTF_8));
        append.addColumn(info, "address".getBytes(StandardCharsets.UTF_8), "guangzhou".getBytes(StandardCharsets.UTF_8));
        // 添加数据
        Table table = connection.getTable(TableName.valueOf("person"));
        table.append(append);
    }

    /**
     * 根据行键删除删除某一个列的记录
     */
    @Test
    public void deleteRow() throws IOException {
        Table table = connection.getTable(TableName.valueOf("person"));
        String rowkey = "2";
        table.delete(new Delete(rowkey.getBytes(StandardCharsets.UTF_8)));
    }

    /**
     * 删除某列
     */
    @Test
    public void deleteCell() throws IOException {
        Table table = connection.getTable(TableName.valueOf("person"));
        String rowkey = "2";
        Delete delete = new Delete(rowkey.getBytes(StandardCharsets.UTF_8));
        delete.addColumn("basic".getBytes(StandardCharsets.UTF_8), "name".getBytes(StandardCharsets.UTF_8));
        // delete 'person', '2', 'basic:name'
        table.delete(delete);
    }

    /**
     * 根据行键获取该行的记录
     */
    @Test
    public void get() throws IOException {
        Table table = connection.getTable(TableName.valueOf("person"));
        String rowkey = "2";
        Result result = table.get(new Get(rowkey.getBytes(StandardCharsets.UTF_8)));
        this.extractResult(result);
    }

    /**
     * 扫描指定的表
     */
    @Test
    public void scan() throws IOException {
        Table table = connection.getTable(TableName.valueOf("person"));
        ResultScanner scanner = table.getScanner(new Scan());
        for (Result result : scanner) {
            this.extractResult(result);
        }
    }

    /**
     * 参考博客：http://itindex.net/detail/60348-hbase-%E7%BB%9F%E8%AE%A1-rowcount
     */
    @Test
    public void count() throws IOException {
        Table table = connection.getTable(TableName.valueOf("person"));
        Scan scan = new Scan();
        // KeyOnlyFilter只查key，提高count速度
        scan.setFilter(new KeyOnlyFilter());
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        int rowCount = 0;
        ResultScanner rs = table.getScanner(scan);
        for (Result result : rs) {
            rowCount += result.size();
        }
        stopWatch.stop();
        System.out.println("cost: " + stopWatch.getTime());
        System.out.println(rowCount);
    }

    @Test
    public void filter1() throws IOException {
        Scan scan = new Scan();
        scan.setFilter(new ValueFilter(CompareOperator.EQUAL, new RegexStringComparator(".*AAA.*")));

        Table table = connection.getTable(TableName.valueOf("person"));
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        ResultScanner scanner = table.getScanner(scan);
        stopWatch.stop();
        System.out.println("cost:" + stopWatch.getTime());
        for (Result result : scanner) {
            this.extractResult(result);
        }
    }

    /**
     * 这个person目前是只有password一个列，能不能当某行数据的某列为空，某列为xxx，某列like这样子？
     */
    @Test
    public void filter2() throws IOException {
        Scan scan = new Scan();
        // scan 是全局扫描，addColumn可以限定具体的列或者列簇，由于列簇和列比较多，影响开销。大概是这样，代码时间太久了
        FilterList filterList = new FilterList();
        filterList.addFilter(new SingleColumnValueFilter("basic".getBytes(StandardCharsets.UTF_8), "password1".getBytes(StandardCharsets.UTF_8),
                CompareOperator.EQUAL, new RegexStringComparator(".*AAA.*")));
        filterList.addFilter(new SingleColumnValueFilter("basic".getBytes(StandardCharsets.UTF_8), "password2".getBytes(StandardCharsets.UTF_8),
                CompareOperator.EQUAL, new RegexStringComparator("FOCPZE")));
        scan.setFilter(filterList);
        Table table = connection.getTable(TableName.valueOf("person"));
        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) {
            this.extractResult(result);
        }
    }

    /**
     * 打印hbase的行
     *
     * @param result
     * @throws IOException
     */
    private void extractResult(Result result) throws IOException {
        CellScanner cellScanner = result.cellScanner();
        //使用迭代器遍历元素 advance判断是否有下一个元素
        while (cellScanner.advance()) {
            //取出当前单元格
            Cell current = cellScanner.current();
            System.out.println("\n" + new String(CellUtil.cloneRow(current)) +
                    "\t" + new String(CellUtil.cloneFamily(current)) +
                    "\t" + new String(CellUtil.cloneQualifier(current)) +
                    "\t" + new String(CellUtil.cloneValue(current)));
        }
    }

    /**
     * 查询所有命名空间下的table，除了hbase命名空间。hbase命名空间存放的是hbase的配置
     */
    @Test
    public void listTableNames() throws IOException {
        TableName[] tableNames = admin.listTableNames();
        for (TableName tableName : tableNames) {
            System.out.println(tableName);
        }
    }


}
